/**************************************************************************************
 * Copyright (c) 2016, Tomoaki Yamaguchi
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * and Eclipse Distribution License v1.0 which accompany this distribution.
 *
 * The Eclipse Public License is available at
 *    http://www.eclipse.org/legal/epl-v10.html
 * and the Eclipse Distribution License is available at
 *   http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * Contributors:
 *    Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
 *    Tieto Poland Sp. z o.o. - Gateway improvements
 **************************************************************************************/

#include "MQTTSNGWPublishHandler.h"
#include "MQTTSNGWPacket.h"
#include "MQTTGWPacket.h"
#include "MQTTSNGateway.h"
#include "MQTTSNGWClient.h"
#include "MQTTSNGWQoSm1Proxy.h"
#include <string.h>
using namespace std;
using namespace MQTTSNGW;

MQTTSNPublishHandler::MQTTSNPublishHandler(Gateway* gateway)
{
    _gateway = gateway;
}

MQTTSNPublishHandler::~MQTTSNPublishHandler()
{

}

MQTTGWPacket* MQTTSNPublishHandler::handlePublish(Client* client, MQTTSNPacket* packet)
{
    uint8_t dup;
    int qos;
    uint8_t retained;
    uint16_t msgId;
    uint16_t tid;
    uint8_t* payload;
    MQTTSN_topicid topicid;
    int payloadlen;
    Publish pub = MQTTPacket_Publish_Initializer;

    char shortTopic[2];

    if (!_gateway->getAdapterManager()->getQoSm1Proxy()->isActive())
    {
        if (client->isQoSm1())
        {
            _gateway->getAdapterManager()->getQoSm1Proxy()->savePacket(client, packet);

            return nullptr;
        }
    }

    if (packet->getPUBLISH(&dup, &qos, &retained, &msgId, &topicid, &payload, &payloadlen) == 0)
    {
        return nullptr;
    }
    pub.msgId = msgId;
    pub.header.bits.dup = dup;
    pub.header.bits.qos = (qos == 3 ? 0 : qos);
    pub.header.bits.retain = retained;
    tid = topicid.data.id;

    Topic* topic = nullptr;

    if (topicid.type == MQTTSN_TOPIC_TYPE_SHORT)
    {
        shortTopic[0] = topicid.data.short_name[0];
        shortTopic[1] = topicid.data.short_name[1];
        pub.topic = shortTopic;
        pub.topiclen = 2;
    }
    else
    {
        topic = client->getTopics()->getTopicById(&topicid);
        if (!topic)
        {
            topic = _gateway->getTopics()->getTopicById(&topicid);
            if (topic)
            {
                topic = client->getTopics()->add(topic->getTopicName()->c_str(), topic->getTopicId());
            }
        }

        if (!topic && qos == 3)
        {
            WRITELOG("%s Invalid TopicId.%s %s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER);
            return nullptr;
        }

        if ((qos == 0 || qos == 3) && msgId > 0)
        {
            WRITELOG("%s Invalid MsgId.%s %s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER);
            return nullptr;
        }

        if (!topic && msgId && qos > 0 && qos < 3)
        {
            /* Reply PubAck with INVALID_TOPIC_ID to the client */
            MQTTSNPacket* pubAck = new MQTTSNPacket();
            pubAck->setPUBACK(topicid.data.id, msgId, MQTTSN_RC_REJECTED_INVALID_TOPIC_ID);
            Event* ev1 = new Event();
            ev1->setClientSendEvent(client, pubAck);
            _gateway->getClientSendQue()->post(ev1);
            return nullptr;
        }
        if (topic)
        {
            pub.topic = (char*) topic->getTopicName()->data();
            pub.topiclen = topic->getTopicName()->length();
            topicid.data.long_.name = pub.topic;
            topicid.data.long_.len = pub.topiclen;
        }
    }
    /* Save a msgId & a TopicId pare for PUBACK */
    if (msgId && qos > 0 && qos < 3)
    {
        client->setWaitedPubTopicId(msgId, tid, &topicid);
    }

    pub.payload = (char*) payload;
    pub.payloadlen = payloadlen;

    MQTTGWPacket* publish = new MQTTGWPacket();
    publish->setPUBLISH(&pub);

    if (_gateway->getAdapterManager()->isAggregaterActive() && client->isAggregated())
    {
        return publish;
    }
    else
    {
        Event* ev1 = new Event();
        ev1->setBrokerSendEvent(client, publish);
        _gateway->getBrokerSendQue()->post(ev1);
        return nullptr;
    }
}

void MQTTSNPublishHandler::handlePuback(Client* client, MQTTSNPacket* packet)
{
    uint16_t topicId;
    uint16_t msgId;
    uint8_t rc;

    if (client->isActive())
    {
        if (packet->getPUBACK(&topicId, &msgId, &rc) == 0)
        {
            return;
        }

        if (rc == MQTTSN_RC_ACCEPTED)
        {
            if (!_gateway->getAdapterManager()->getAggregater()->isActive())
            {
                MQTTGWPacket* pubAck = new MQTTGWPacket();
                pubAck->setAck(PUBACK, msgId);
                Event* ev1 = new Event();
                ev1->setBrokerSendEvent(client, pubAck);
                _gateway->getBrokerSendQue()->post(ev1);
            }
        }
        else if (rc == MQTTSN_RC_REJECTED_INVALID_TOPIC_ID)
        {
            WRITELOG("  PUBACK   %d : Invalid Topic ID\n", msgId);
        }
    }
}

void MQTTSNPublishHandler::handleAck(Client* client, MQTTSNPacket* packet, uint8_t packetType)
{
    uint16_t msgId;

    if (client->isActive())
    {
        if (packet->getACK(&msgId) == 0)
        {
            return;
        }
        MQTTGWPacket* ackPacket = new MQTTGWPacket();
        ackPacket->setAck(packetType, msgId);
        Event* ev1 = new Event();
        ev1->setBrokerSendEvent(client, ackPacket);
        _gateway->getBrokerSendQue()->post(ev1);
    }
}

void MQTTSNPublishHandler::handleRegister(Client* client, MQTTSNPacket* packet)
{
    uint16_t id;
    uint16_t msgId;
    MQTTSNString topicName = MQTTSNString_initializer;
    ;
    MQTTSN_topicid topicid;

    if (client->isActive() || client->isAwake())
    {
        if (packet->getREGISTER(&id, &msgId, &topicName) == 0)
        {
            return;
        }

        topicid.type = MQTTSN_TOPIC_TYPE_NORMAL;
        topicid.data.long_.len = topicName.lenstring.len;
        topicid.data.long_.name = topicName.lenstring.data;

        id = client->getTopics()->add(&topicid)->getTopicId();

        MQTTSNPacket* regAck = new MQTTSNPacket();
        regAck->setREGACK(id, msgId, MQTTSN_RC_ACCEPTED);
        Event* ev = new Event();
        ev->setClientSendEvent(client, regAck);
        _gateway->getClientSendQue()->post(ev);
    }
}

void MQTTSNPublishHandler::handleRegAck(Client* client, MQTTSNPacket* packet)
{
    uint16_t id;
    uint16_t msgId;
    uint8_t rc;
    if (client->isActive() || client->isAwake())
    {
        if (packet->getREGACK(&id, &msgId, &rc) == 0)
        {
            return;
        }

        /* get PUBLISH message */
        MQTTSNPacket* regAck = client->getWaitREGACKPacketList()->getPacket(msgId);

        if (regAck != nullptr)
        {
            client->getWaitREGACKPacketList()->erase(msgId);
            Event* ev = new Event();
            ev->setClientSendEvent(client, regAck);
            _gateway->getClientSendQue()->post(ev);
        }

        if (client->isHoldPingReqest() && client->getWaitREGACKPacketList()->getCount() == 0)
        {
            /* send PINGREQ to the broker */
            client->resetPingRequest();
            MQTTGWPacket* pingreq = new MQTTGWPacket();
            pingreq->setHeader(PINGREQ);
            Event* evt = new Event();
            evt->setBrokerSendEvent(client, pingreq);
            _gateway->getBrokerSendQue()->post(evt);
        }
    }

}

void MQTTSNPublishHandler::handleAggregatePublish(Client* client, MQTTSNPacket* packet)
{
    int msgId = 0;
    MQTTGWPacket* publish = handlePublish(client, packet);
    if (publish != nullptr)
    {
        if (publish->getMsgId() > 0)
        {
            if (packet->isDuplicate())
            {
                msgId = _gateway->getAdapterManager()->getAggregater()->getMsgId(client, packet->getMsgId());
            }
            else
            {
                msgId = _gateway->getAdapterManager()->getAggregater()->addMessageIdTable(client, packet->getMsgId());
            }
            publish->setMsgId(msgId);
        }
        Event* ev1 = new Event();
        ev1->setBrokerSendEvent(client, publish);
        _gateway->getBrokerSendQue()->post(ev1);
    }
}

void MQTTSNPublishHandler::handleAggregateAck(Client* client, MQTTSNPacket* packet, int type)
{
    if (type == MQTTSN_PUBREC)
    {
        uint16_t msgId;

        if (packet->getACK(&msgId) == 0)
        {
            return;
        }
        MQTTSNPacket* ackPacket = new MQTTSNPacket();
        ackPacket->setPUBREL(msgId);
        Event* ev = new Event();
        ev->setClientSendEvent(client, ackPacket);
        _gateway->getClientSendQue()->post(ev);
    }
}
